---
title: Create Kafka Consumer Code for Event
category:
  id: Code
  label: Code
  icon: Code
inputs:
  - id: event-name
    label: Select the event you want to create a consumer for
    type: resource-list-events
  - id: code-language
    label: What is the programming language of the code?
    type: select
    options:
      - Python
      - Java
      - TypeScript
      - Go
      - Ruby
---

Generate Kafka consumer code in `{{code-language}}` that consumes the event `{{event-name}}`.

**Kafka Cluster Details:**

*   **Bootstrap Servers:** `kafka-prod-1.flowmart.internal:9092,kafka-prod-2.flowmart.internal:9092,kafka-prod-3.flowmart.internal:9092`
*   **Topic Name:** The topic name follows the pattern `fm.events.<event-name>` (e.g., `fm.events.UserSignedUp` for the `UserSignedUp` event).
*   **Consumer Group ID:** Use a descriptive consumer group ID, like `consumer-{{event-name}}-service`.
*   **Security Protocol:** SASL_SSL
*   **SASL Mechanism:** PLAIN

**Best Practices to Follow:**

1.  **Deserialization:** Assume the event payload is JSON and follows FlowMart's payload standards (with `metadata` and `data` fields). Deserialize the message payload into an appropriate data structure or class.
2.  **Error Handling:** Implement robust error handling for connection issues, deserialization failures, and message processing errors. Use a dead-letter queue (DLQ) pattern for messages that cannot be processed after retries. The DLQ topic name is `dlq.fm.events.<event-name>`.
3.  **Configuration:** Externalize Kafka connection details and consumer group ID (don't hardcode them directly in the main logic).
4.  **Logging:** Add basic logging for successful message consumption and any errors encountered.
5.  **Commit Strategy:** Use manual commits (`enable.auto.commit=false`) to ensure messages are only marked as processed after successful handling.
6.  **Idempotency:** Briefly mention how the consumer logic should handle potential duplicate messages (e.g., by checking a database record based on `metadata.correlationId` before processing).

**Task:**

Provide the complete, runnable Kafka consumer code snippet in `{{code-language}}` for the `{{event-name}}` event, incorporating the cluster details and best practices mentioned above. Include necessary imports and basic setup.

If you use any external libraries, please include the import statements and how to install them, step by step, make sure dependencies are listed first.



